from ma.commons.core.reducetaskrunner import ReduceTaskRunner
from ma.commons.core.abstractreducer import AbstractReducer
from ma.commons.core.processrunner import ProcessRunner
import ma.commons.core.constants as constants
import ma.const
import ma.log

import sys
import pickle
import time
import os
import traceback

# TODO: Fix the loggers ... since they are running on a separate process


class MRPlusReduceTaskRunner(ReduceTaskRunner):
    
    def __init__(self, tip_info, user_module_name, user_class_name):
        
        ReduceTaskRunner.__init__(self, tip_info, user_module_name, user_class_name)
        
        self.__log = ma.log.get_logger("ma.codes")
        
    
    def startTask(self):
        """This func is called to start the user defined code as a separate 
        process
        """
        
        input_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, self.tip_info.job_id) + os.sep
    
        list_of_dict = []
        list_of_keys = []
        
        # list tracking changes in keys
        list_of_not_seen_keys = []
        list_of_not_changed_keys = []
        list_of_thresholded_keys = []
        
        for input_data in self.tip_info.input_data:
            if input_data[1] == constants.MAP:
                filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_map_output_filename, input_data[0], input_data[2])
            else:
                filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_output_filename, input_data[0], input_data[2])
            
            # filepath for the shuffled input
            filepath = input_dest_dir + filename
            
            try:
                # self.__log.info('Input file to reduce %d: %s', self.tip_info.task_id, filepath)
                
                f = open(filepath, 'r')
                filecontents = f.read()
                f.close()
                
                self.__log.info('Input file to reduce %d read completely: %s', self.tip_info.task_id, filepath)
                
                # read dictionary and append to list
                d = eval(filecontents)
                self.__log.debug('Input file to reduce %d shifted to dictionary', self.tip_info.task_id)
                list_of_dict.append(d)
                self.__log.debug('Dictionary of reduce %d shifted to the list of dicts', self.tip_info.task_id)
                
                # add all keys to our key list
                for key in d:
                    if key not in list_of_keys:
                        list_of_keys.append(key)
                self.__log.debug('Adjusted list of keys for Reduce %d', self.tip_info.task_id)
                
            except IOError as err_msg:
                self.__log.error("Error while reading reduce input or running task: %s", err_msg)
                os._exit(-1)
        
        self.__log.debug('Read all files into the dictionary for Reduce %d', self.tip_info.task_id)
        
        # builds the list of keys not encountered in the input data
        if self.tip_info.list_of_all_keys != None:
            for key in self.tip_info.list_of_all_keys:
                if key not in list_of_keys:
                    print(("------ Reduce ID %d Job ID %d -----> LIST OF NOT SEEN KEYS - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                    list_of_not_seen_keys.append(key)
                    
        self.__log.debug('List of keys that weren\'t encountered: %s', list_of_not_seen_keys)
        self.__log.info('Number of keys for reduce %d: %d', self.tip_info.task_id, len(list_of_keys))
        
        # get the required constants important for the computation
        change_margin = ma.const.JobsXmlData.get_float_data(ma.const.xml_est_reduce_margin, self.tip_info.job_id)
        min_inputs_for_change = ma.const.JobsXmlData.get_int_data(ma.const.xml_min_inputs_for_change, self.tip_info.job_id)
        threshold_value = ma.const.JobsXmlData.get_float_data(ma.const.xml_threshold_value, self.tip_info.job_id)
        
        # get the time scaleup value for reduce computation
        reduce_time_scaleup = ma.const.JobsXmlData.get_float_data(ma.const.xml_reduce_secs_inp_scaleup, self.tip_info.job_id)
        
        if reduce_time_scaleup > 0:
            self.__log.warning('Reduce time scale-up: reduce ID %d Job %d: %f', self.tip_info.task_id, self.tip_info.job_id, reduce_time_scaleup)
        
        # iterate through all keys
        for key in list_of_keys:
            list = []
            
            # collate all values from all dictionaries
            for d in list_of_dict:
                if key in d:
                    list += d[key]
            
            if_changed = [False]
            user_vars = [change_margin, if_changed, threshold_value, reduce_time_scaleup]
            
            # self.__log.info('Input to reduce %d function for key: %s', self.tip_info.task_id, key)
            
            th = eval( 'self.user_class.' + self.user_class_name + '(key, list, self.key_value_dict, user_vars)' )
            self.threads.append(th)
            
            # check if the Reducer is thresholdable
            thresholdable = th.thresholdable
            
            # start computation as thread and wait for finish
            th.start()
            th.join()
            
            # if the value has been reported by the reduce worker as not
            #    changes, and the number of inputs to note this change is
            #    enough, add to the list of not changed keys
            if not if_changed[0] and len(list) >= min_inputs_for_change:
                print(("------ Reduce ID %d Job ID %d -----> KEY NOT CHANGED - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                list_of_not_changed_keys.append(key)
            
            # if there is a valid threshold value and the Reducer is Thresholdable
            if threshold_value != None and thresholdable == AbstractReducer.THRESHOLDABLE:
                # if the key has been added to the dictionary by the reduce worker
                if key in self.key_value_dict:
                    final_key_value = self.key_value_dict[key][0]
                    
                    # if the key's value has crossed the threshold
                    if final_key_value > threshold_value:
                        print(("------ Reduce ID %d Job ID %d -----> KEY THRESHOLDED - %s" % (self.tip_info.task_id, self.tip_info.job_id, str(key))))
                        list_of_thresholded_keys.append((key, time.time(), self.tip_info.task_id, self.tip_info.struct_data))
        
        self.__log.debug('No. of keys out of %d those didn\'t change: %s', len(list_of_keys), len(list_of_not_changed_keys))
        
        self.__log.info('Actual processing on reduce %d ENDED', self.tip_info.task_id)
        
        if self.key_value_dict == {}:
            print('---------------------> Hey! Empty dictionary being written as reduce output', self.tip_info.task_id, self.tip_info.input_data)
        
        # write the dictionary to the output file
        if not self.writeOutputToLocalFile():
            # some error occurred
            return [ False ]
        else:
            # ran fine
            return [ True, list_of_not_seen_keys, list_of_not_changed_keys, list_of_thresholded_keys ] 
            

if __name__ == "__main__":
    """This function is called by the process creator to transfer information
    about the reduce task to run
    """
    
    try:
        print("Running MRPlusReduceTaskRunner process pid", os.getpid())
        
        if len(sys.argv) == 5:
            user_module_name = sys.argv[1]
            user_class_name = sys.argv[2]
            job_id = sys.argv[3]
            task_id = sys.argv[4]
            #print user_module_name, user_class_name
            
            # get the filename which holds the reduce compute misc. output
            proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, job_id, task_id)
            
            # get the pickled list data
            list = ProcessRunner.pull_pickled_data(proc_temp_filename, job_id)
            reducetipinfo = list.__getitem__(0)
            #print 'Job ID:', maptipinfo.job_id
            #print 'Task ID:', maptipinfo.task_id
            #print 'Map or reduce:', maptipinfo.map_or_reduce
            #print 'Struct data:', maptipinfo.struct_data
            #print 'Input data:', maptipinfo.input_data
            
            task_runner = MRPlusReduceTaskRunner(reducetipinfo, user_module_name, user_class_name)
            # open files, process, write output
            ret = task_runner.startTask()
            
            # print "reducetipinfo.list_of_all_keys:", reducetipinfo.list_of_all_keys
            print(("Reduce runner %d of job-id %d successful: %s" % (reducetipinfo.task_id, reducetipinfo.job_id, str(ret[0]))))
            
            if not ret[0]:
                # some error occurred
                os._exit(-1)
            else:
                # pack and pickle the lists back to the ReduceTIP
                list_of_not_seen_keys = ret[1]
                list_of_not_changed_keys = ret[2]
                list_of_thresholded_keys = ret[3]
                
                # pickle the list and write to temp file
                list = [list_of_not_seen_keys, list_of_not_changed_keys, list_of_thresholded_keys]
                proc_temp_filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_reduce_proc_temp_filename, reducetipinfo.job_id, reducetipinfo.task_id)
                ProcessRunner.store_pickled_data(list, proc_temp_filename, reducetipinfo.job_id)
                    
                # ran fine
                os._exit(0)
        else:
            print('Wrong number of arguments for ReduceTaskRunner process')
            os._exit(-1)
            
    except Exception as err_msg:
        traceback.print_exception()
        print("MRPlusReduceTaskRunner:__main__ Exception in running reduce: %s" % err_msg)
        os._exit(-1)
